Skip to content

LangGraph Store 集成方案

一、模块概述

属性说明
模块名称Store(长期记忆)
优先级🟡 P2(中高)
预估工时2-3 天
依赖项langgraph.store, PostgresStore/RedisStore

为什么需要

当前 Checkpointer 只能保存单次会话的状态,无法跨会话记住用户偏好。集成 Store 后可以:

  • 跨会话记住用户偏好(语言、风格等)
  • 实现个性化对话体验
  • 积累用户相关知识
  • 增强上下文理解能力

二、架构设计

2.1 Checkpointer vs Store 对比

┌─────────────────────────────────────────────────────────────────────┐
│                    Checkpointer vs Store 对比                         │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Checkpointer (短期记忆)              Store (长期记忆)              │
│   ┌─────────────────┐                ┌─────────────────┐            │
│   │   会话 A        │                │                 │            │
│   │   thread_id: 1  │                │  User 123       │            │
│   │   messages: [...]│                │  ┌───────────┐ │            │
│   └─────────────────┘                │  │preferences│ │            │
│                                      │  │ - 语言:中文│ │            │
│   ┌─────────────────┐                │  │ - 风格:简洁│ │            │
│   │   会话 B        │                │  └───────────┘ │            │
│   │   thread_id: 2  │                │  ┌───────────┐ │            │
│   │   messages: [...]│                │  │ memories  │ │            │
│   └─────────────────┘                │  │ - 喜欢猫  │ │            │
│                                      │  │ - 程序员   │ │            │
│   ⚠️ 会话之间隔离                     │  └───────────┘ │            │
│   无法共享信息                         │                 │            │
│                                      │  ✅ 跨会话共享   │            │
│                                      └─────────────────┘            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.2 Store 数据模型

python
# Store 使用 namespace 组织数据
# namespace 格式: (category, user_id, sub_category)

# 示例数据结构:
namespace = ("memories", "user_123", "preferences")
# 存储键值对
store.put(namespace, "language", {"value": "中文", "confidence": 0.9})
store.put(namespace, "style", {"value": "简洁", "confidence": 0.8})

namespace = ("memories", "user_123", "facts")
store.put(namespace, str(uuid.uuid4()), {"text": "用户是一名程序员", "source": "对话中提到"})
store.put(namespace, str(uuid.uuid4()), {"text": "用户喜欢猫", "source": "用户明确说明"})

2.3 状态设计

python
from typing import TypedDict, Optional, List, Any
from dataclasses import dataclass
from langgraph.graph import MessagesState
from langgraph.runtime import Runtime

@dataclass
class UserContext:
    """用户上下文"""
    user_id: str
    nickname: Optional[str] = None
    email: Optional[str] = None

class StateWithMemory(MessagesState):
    """带记忆的状态"""
    # 继承 messages
    user_id: str
    relevant_memories: Optional[List[str]] = None

三、代码实现

3.1 Store 初始化

修改文件: services/checkpointer.py,添加 Store 初始化:

python
"""Store 初始化 - 在现有 checkpointer.py 中添加"""

from contextlib import contextmanager
import os
from dotenv import load_dotenv

load_dotenv(override=True)


def get_mysql_connection_string() -> str:
    """获取 MySQL 连接字符串"""
    host = os.getenv("MYSQL_HOST", "localhost")
    port = os.getenv("MYSQL_PORT", "3306")
    user = os.getenv("MYSQL_USER", "root")
    password = os.getenv("MYSQL_PASSWORD", "root")
    database = os.getenv("MYSQL_DATABASE", "langchain_chat")
    return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"


@contextmanager
def get_checkpointer():
    """获取 MySQL Checkpointer"""
    from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
    conn_string = get_mysql_connection_string()
    with PyMySQLSaver.from_conn_string(conn_string) as checkpointer:
        checkpointer.setup()
        yield checkpointer


@contextmanager
def get_async_checkpointer():
    """获取异步 MySQL Checkpointer"""
    from langgraph.checkpoint.mysql.asyncmy import AsyncPyMySQLSaver
    conn_string = get_mysql_connection_string()
    with AsyncPyMySQLSaver.from_conn_string(conn_string) as checkpointer:
        checkpointer.setup()
        yield checkpointer


# ============ 新增 Store 相关函数 ============

@contextmanager
def get_store():
    """
    获取 Store 上下文管理器

    使用 PostgreSQL 作为长期记忆存储(推荐)
    需要: pip install langgraph-checkpoint-postgres

    Example:
        with get_store() as store:
            memories = store.search(("memories", user_id))
    """
    # 使用 PostgreSQL Store(推荐,支持语义搜索)
    # from langgraph.store.postgres import PostgresStore
    # from langchain.embeddings import init_embeddings

    # 简化版:使用内存 Store(开发测试用)
    from langgraph.store.memory import InMemoryStore

    store = InMemoryStore()
    yield store


@contextmanager
def get_postgres_store():
    """
    获取 PostgreSQL Store(生产环境推荐)

    支持语义搜索,需要安装:
        pip install langgraph-checkpoint-postgres

    配置环境变量:
        POSTGRES_HOST=localhost
        POSTGRES_PORT=5432
        POSTGRES_USER=postgres
        POSTGRES_PASSWORD=postgres
        POSTGRES_DATABASE=langgraph_store
    """
    from langgraph.store.postgres import PostgresStore
    from langchain.embeddings import init_embeddings

    host = os.getenv("POSTGRES_HOST", "localhost")
    port = os.getenv("POSTGRES_PORT", "5432")
    user = os.getenv("POSTGRES_USER", "postgres")
    password = os.getenv("POSTGRES_PASSWORD", "postgres")
    database = os.getenv("POSTGRES_DATABASE", "langgraph_store")

    conn_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"

    # 可选:启用语义搜索
    embeddings = init_embeddings("openai:text-embedding-3-small")

    with PostgresStore.from_conn_string(
        conn_string,
        index={"embed": embeddings, "dims": 1536}
    ) as store:
        store.setup()
        yield store

3.2 记忆管理服务

创建文件: services/memory_store.py

python
"""用户长期记忆管理服务

提供用户偏好的存储、检索和管理功能。
"""
import logging
import uuid
from typing import Optional, List, Dict, Any
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class MemoryItem:
    """记忆项"""
    key: str
    value: Dict[str, Any]
    namespace: tuple


class MemoryService:
    """记忆管理服务"""

    def __init__(self):
        self._store = None

    def _get_namespace(self, user_id: str, category: str = "facts") -> tuple:
        """获取命名空间"""
        return ("memories", user_id, category)

    def store_preference(
        self,
        user_id: str,
        preference_type: str,
        value: Any,
        confidence: float = 1.0
    ) -> None:
        """
        存储用户偏好

        Args:
            user_id: 用户 ID
            preference_type: 偏好类型(language, style, topic 等)
            value: 偏好值
            confidence: 置信度 (0-1)
        """
        from services.checkpointer import get_store

        with get_store() as store:
            namespace = self._get_namespace(user_id, "preferences")
            store.put(
                namespace,
                preference_type,
                {"value": value, "confidence": confidence}
            )
            logger.info(f"存储用户偏好: {user_id}/{preference_type} = {value}")

    def get_preference(self, user_id: str, preference_type: str) -> Optional[Any]:
        """获取用户偏好"""
        from services.checkpointer import get_store

        with get_store() as store:
            namespace = self._get_namespace(user_id, "preferences")
            try:
                item = store.get(namespace, preference_type)
                return item.value if item else None
            except Exception as e:
                logger.warning(f"获取偏好失败: {e}")
                return None

    def store_fact(
        self,
        user_id: str,
        fact: str,
        source: str = "conversation"
    ) -> None:
        """
        存储用户事实信息

        Args:
            user_id: 用户 ID
            fact: 事实内容(如"用户是一名程序员")
            source: 来源(conversation, explicit 等)
        """
        from services.checkpointer import get_store

        with get_store() as store:
            namespace = self._get_namespace(user_id, "facts")
            store.put(
                namespace,
                str(uuid.uuid4()),
                {"text": fact, "source": source}
            )
            logger.info(f"存储用户事实: {user_id} - {fact[:50]}...")

    def search_memories(
        self,
        user_id: str,
        query: str,
        limit: int = 5
    ) -> List[MemoryItem]:
        """
        搜索用户相关记忆

        Args:
            user_id: 用户 ID
            query: 搜索查询
            limit: 返回数量限制

        Returns:
            匹配的记忆列表
        """
        from services.checkpointer import get_store

        with get_store() as store:
            namespace = self._get_namespace(user_id, "facts")
            try:
                items = store.search(namespace, query=query, limit=limit)
                return [
                    MemoryItem(key=item.key, value=item.value, namespace=namespace)
                    for item in items
                ]
            except Exception as e:
                logger.warning(f"搜索记忆失败: {e}")
                return []

    def get_all_preferences(self, user_id: str) -> Dict[str, Any]:
        """获取用户所有偏好"""
        from services.checkpointer import get_store

        with get_store() as store:
            namespace = self._get_namespace(user_id, "preferences")
            try:
                # 获取命名空间下所有项目
                items = store.search(namespace, limit=100)
                return {
                    item.key: item.value.get("value")
                    for item in items
                }
            except Exception as e:
                logger.warning(f"获取偏好失败: {e}")
                return {}

    def build_memory_context(self, user_id: str, query: str) -> str:
        """
        构建记忆上下文

        用于在系统提示中添加用户相关信息

        Args:
            user_id: 用户 ID
            query: 当前查询(用于语义搜索)

        Returns:
            格式化的记忆上下文字符串
        """
        preferences = self.get_all_preferences(user_id)
        relevant_facts = self.search_memories(user_id, query, limit=3)

        context_parts = []

        # 添加偏好
        if preferences:
            pref_str = ", ".join(f"{k}: {v}" for k, v in preferences.items())
            context_parts.append(f"用户偏好: {pref_str}")

        # 添加相关事实
        if relevant_facts:
            facts_str = "\n".join(f"- {f.value.get('text', '')}" for f in relevant_facts)
            context_parts.append(f"关于用户的信息:\n{facts_str}")

        if context_parts:
            return "\n\n".join(context_parts)
        return ""


# 全局实例
_memory_service: Optional[MemoryService] = None


def get_memory_service() -> MemoryService:
    """获取记忆服务单例"""
    global _memory_service
    if _memory_service is None:
        _memory_service = MemoryService()
    return _memory_service

3.3 带 Store 的 Agent

修改 services/langgraph_chat.py,添加 Store 支持:

python
"""带 Store 支持的聊天服务

在原有基础上添加长期记忆功能。
"""
import os
import logging
from typing import Optional, Iterator, Dict, Any, List
from dataclasses import dataclass, field
from dotenv import load_dotenv

from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph, MessagesState

from services.memory_store import get_memory_service

load_dotenv(override=True)
logger = logging.getLogger(__name__)


@dataclass
class ChatConfig:
    """聊天配置"""
    api_key: str = field(default_factory=lambda: os.getenv("OPENROUTER_API_KEY", ""))
    base_url: str = field(default_factory=lambda: os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"))
    default_model: str = field(default_factory=lambda: os.getenv("OPENROUTER_MODEL", "openai/gpt-4o"))
    default_system_prompt: str = "你是一个友好的 AI 助手,请用中文回复。"


class MemoryEnabledChatService:
    """带长期记忆的聊天服务"""

    def __init__(self, config: Optional[ChatConfig] = None):
        self.config = config or ChatConfig()
        self.memory_service = get_memory_service()

    def _get_llm(self, model: Optional[str] = None, temperature: float = 0.7) -> ChatOpenAI:
        """获取 LLM 实例"""
        return ChatOpenAI(
            model=model or self.config.default_model,
            api_key=self.config.api_key,
            base_url=self.config.base_url,
            temperature=temperature
        )

    def _call_model_with_memory(
        self,
        state: MessagesState,
        model: Optional[str] = None,
        system_prompt: Optional[str] = None,
        user_id: Optional[str] = None
    ):
        """调用模型(带记忆上下文)"""
        llm = self._get_llm(model)

        # 构建记忆上下文
        memory_context = ""
        if user_id:
            query = str(state["messages"][-1].content) if state["messages"] else ""
            memory_context = self.memory_service.build_memory_context(user_id, query)

            # 检测是否需要存储新记忆
            self._maybe_store_memory(user_id, state)

        # 构建系统提示
        if memory_context:
            enhanced_system_prompt = f"""{system_prompt or self.config.default_system_prompt}

## 用户相关信息
{memory_context}

请根据用户的偏好和历史信息提供个性化的回答。"""
        else:
            enhanced_system_prompt = system_prompt or self.config.default_system_prompt

        # 调用模型
        messages = [
            SystemMessage(content=enhanced_system_prompt),
            *state["messages"]
        ]
        response = llm.invoke(messages)
        return {"messages": [response]}

    def _maybe_store_memory(self, user_id: str, state: MessagesState) -> None:
        """检测并存储新记忆"""
        if not state["messages"]:
            return

        last_message = state["messages"][-1]
        content = str(last_message.content).lower()

        # 检测用户声明偏好
        preference_patterns = {
            "language": ["我喜欢用中文", "请用中文", "用中文回复", "我习惯用英文"],
            "style": ["简洁一点", "详细一点", "简短回答", "详细解释"],
        }

        for pref_type, patterns in preference_patterns.items():
            for pattern in patterns:
                if pattern in content:
                    value = "中文" if "中文" in pattern else "简洁" if "简洁" in pattern or "简短" in pattern else "详细"
                    self.memory_service.store_preference(user_id, pref_type, value)
                    logger.info(f"自动检测到用户偏好: {pref_type} = {value}")
                    return

        # 检测用户声明事实
        fact_keywords = ["我是", "我叫", "我住在", "我在", "我喜欢", "我的工作是", "我是做"]
        for keyword in fact_keywords:
            if keyword in content:
                # 提取事实(简化版,实际可以用 LLM 提取)
                self.memory_service.store_fact(user_id, last_message.content, source="explicit")
                return

    def chat_stream_with_memory(
        self,
        thread_id: str,
        prompt: str,
        user_id: Optional[str] = None,
        model: Optional[str] = None,
        system_prompt: Optional[str] = None,
        images: Optional[List[str]] = None,
        temperature: float = 0.7,
        **kwargs
    ) -> Iterator[str]:
        """
        流式对话(带长期记忆)

        Args:
            thread_id: 会话 ID
            prompt: 用户输入
            user_id: 用户 ID(用于检索长期记忆)
            model: 模型名称
            system_prompt: 系统提示
            images: 图片列表
            temperature: 温度参数
        """
        from services.checkpointer import get_checkpointer

        # 构建消息内容
        content = prompt
        if images:
            # 处理图片逻辑...
            pass

        with get_checkpointer() as checkpointer:
            # 构建带记忆的工作流
            def call_model(state: MessagesState):
                return self._call_model_with_memory(
                    state, model, system_prompt, user_id
                )

            workflow = StateGraph(MessagesState)
            workflow.add_node("agent", call_model)
            workflow.add_edge(START, "agent")
            app = workflow.compile(checkpointer=checkpointer)

            config = {"configurable": {"thread_id": thread_id}}

            for chunk in app.stream(
                {"messages": [HumanMessage(content=content)]},
                config,
                stream_mode="messages"
            ):
                if isinstance(chunk, tuple) and len(chunk) == 2:
                    message_chunk, metadata = chunk
                    if hasattr(message_chunk, 'content') and message_chunk.content:
                        yield message_chunk.content


# 全局服务实例
_memory_chat_service: Optional[MemoryEnabledChatService] = None


def get_memory_chat_service() -> MemoryEnabledChatService:
    """获取带记忆的聊天服务单例"""
    global _memory_chat_service
    if _memory_chat_service is None:
        _memory_chat_service = MemoryEnabledChatService()
    return _memory_chat_service

四、API 集成

4.1 修改现有 API

修改 api/chat.py,在聊天时传入用户 ID:

python
from services.langgraph_chat import get_memory_chat_service

@router.post("/chat/stream")
async def chat_stream(
    data: ChatRequest,
    request: Request,
    db: Session = Depends(get_db)
):
    """流式聊天接口(带长期记忆)"""
    session = get_session(request)
    user_id = session.get("user", {}).get("user_id")

    # ... 其他代码 ...

    # 使用带记忆的服务
    chat_service = get_memory_chat_service()

    for chunk in chat_service.chat_stream_with_memory(
        thread_id=thread_id or "anonymous",
        prompt=data.prompt,
        user_id=user_id,  # 传入用户 ID
        model=data.model,
        system_prompt=data.system_prompt,
        images=llm_image_urls,
        user_info=user_info
    ):
        yield chunk

4.2 新增记忆管理 API

创建 api/memory.py:

python
"""记忆管理 API"""
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from typing import Optional, List

from core.database import get_db
from middlewares.auth import get_session
from services.memory_store import get_memory_service

router = APIRouter(prefix="/api/memory", tags=["memory"])


@router.get("/preferences")
async def get_preferences(request: Request):
    """获取用户偏好"""
    session = get_session(request)
    user_id = session.get("user", {}).get("user_id")

    if not user_id:
        return {"success": False, "error": "未登录"}

    memory_service = get_memory_service()
    preferences = memory_service.get_all_preferences(user_id)

    return {"success": True, "preferences": preferences}


@router.put("/preferences/{preference_type}")
async def update_preference(
    preference_type: str,
    value: str,
    request: Request
):
    """更新用户偏好"""
    session = get_session(request)
    user_id = session.get("user", {}).get("user_id")

    if not user_id:
        return {"success": False, "error": "未登录"}

    memory_service = get_memory_service()
    memory_service.store_preference(user_id, preference_type, value)

    return {"success": True, "preference_type": preference_type, "value": value}


@router.get("/facts")
async def get_facts(request: Request, limit: int = 10):
    """获取用户事实信息"""
    session = get_session(request)
    user_id = session.get("user", {}).get("user_id")

    if not user_id:
        return {"success": False, "error": "未登录"}

    memory_service = get_memory_service()
    facts = memory_service.search_memories(user_id, "", limit=limit)

    return {
        "success": True,
        "facts": [
            {"key": f.key, "text": f.value.get("text"), "source": f.value.get("source")}
            for f in facts
        ]
    }


@router.delete("/facts/{fact_key}")
async def delete_fact(fact_key: str, request: Request):
    """删除用户事实"""
    session = get_session(request)
    user_id = session.get("user", {}).get("user_id")

    if not user_id:
        return {"success": False, "error": "未登录"}

    # TODO: 实现删除逻辑
    return {"success": True}

五、前端集成

5.1 偏好设置组件

添加到 static/js/settings.js:

javascript
class UserPreferences {
    constructor() {
        this.preferences = {};
        this.init();
    }

    async init() {
        await this.loadPreferences();
        this.render();
    }

    async loadPreferences() {
        try {
            const response = await fetch('/api/memory/preferences');
            const data = await response.json();
            if (data.success) {
                this.preferences = data.preferences;
            }
        } catch (error) {
            console.error('加载偏好失败:', error);
        }
    }

    async setPreference(type, value) {
        try {
            const response = await fetch(`/api/memory/preferences/${type}?value=${encodeURIComponent(value)}`, {
                method: 'PUT'
            });
            const data = await response.json();
            if (data.success) {
                this.preferences[type] = value;
                this.showToast(`已保存偏好: ${type} = ${value}`);
            }
        } catch (error) {
            console.error('保存偏好失败:', error);
        }
    }

    render() {
        // 在设置面板中渲染偏好选项
        const container = document.getElementById('preferences-container');
        if (!container) return;

        container.innerHTML = `
            <div class="preference-section">
                <h4>对话偏好</h4>

                <div class="preference-item">
                    <label>回复语言</label>
                    <select id="pref-language" onchange="userPrefs.setPreference('language', this.value)">
                        <option value="中文" ${this.preferences.language === '中文' ? 'selected' : ''}>中文</option>
                        <option value="英文" ${this.preferences.language === '英文' ? 'selected' : ''}>英文</option>
                    </select>
                </div>

                <div class="preference-item">
                    <label>回复风格</label>
                    <select id="pref-style" onchange="userPrefs.setPreference('style', this.value)">
                        <option value="简洁" ${this.preferences.style === '简洁' ? 'selected' : ''}>简洁</option>
                        <option value="详细" ${this.preferences.style === '详细' ? 'selected' : ''}>详细</option>
                    </select>
                </div>
            </div>

            <div class="preference-section">
                <h4>关于我</h4>
                <p class="hint">AI 会记住这些信息,提供个性化回答</p>
                <div id="user-facts"></div>
            </div>
        `;

        this.loadFacts();
    }

    async loadFacts() {
        try {
            const response = await fetch('/api/memory/facts?limit=5');
            const data = await response.json();
            if (data.success) {
                const factsContainer = document.getElementById('user-facts');
                factsContainer.innerHTML = data.facts.map(f => `
                    <div class="fact-item">
                        <span>${f.text}</span>
                        <button onclick="userPrefs.deleteFact('${f.key}')" class="btn-delete">×</button>
                    </div>
                `).join('');
            }
        } catch (error) {
            console.error('加载事实失败:', error);
        }
    }

    showToast(message) {
        // 显示提示消息
        const toast = document.createElement('div');
        toast.className = 'toast';
        toast.textContent = message;
        document.body.appendChild(toast);
        setTimeout(() => toast.remove(), 3000);
    }
}

const userPrefs = new UserPreferences();

六、数据库准备

6.1 PostgreSQL Store 表(如使用 PostgreSQL)

sql
-- Store 表会自动创建,以下为参考结构
CREATE TABLE IF NOT EXISTS store (
    id SERIAL PRIMARY KEY,
    namespace VARCHAR(255) NOT NULL,
    key VARCHAR(255) NOT NULL,
    value JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(namespace, key)
);

CREATE INDEX idx_store_namespace ON store(namespace);

七、测试计划

7.1 单元测试

python
# tests/test_memory_store.py
import pytest
from services.memory_store import MemoryService


def test_store_preference():
    """测试存储偏好"""
    service = MemoryService()
    service.store_preference("user_123", "language", "中文")

    value = service.get_preference("user_123", "language")
    assert value == "中文"


def test_store_fact():
    """测试存储事实"""
    service = MemoryService()
    service.store_fact("user_123", "用户是一名程序员")

    facts = service.search_memories("user_123", "程序员")
    assert len(facts) > 0
    assert "程序员" in facts[0].value["text"]


def test_build_memory_context():
    """测试构建记忆上下文"""
    service = MemoryService()
    service.store_preference("user_123", "language", "中文")
    service.store_fact("user_123", "用户喜欢猫")

    context = service.build_memory_context("user_123", "猫")
    assert "中文" in context
    assert "猫" in context

八、实施步骤

步骤 1: Store 基础设施(0.5 天)

  • 修改 services/checkpointer.py,添加 Store 初始化
  • 配置 PostgreSQL 或使用内存 Store
  • 测试 Store 连接

步骤 2: 记忆管理服务(1 天)

  • 创建 services/memory_store.py
  • 实现偏好存储和检索
  • 实现事实存储和语义搜索
  • 编写单元测试

步骤 3: Agent 集成(0.5 天)

  • 修改 services/langgraph_chat.py
  • 添加记忆上下文构建
  • 实现自动记忆提取

步骤 4: API 集成(0.5 天)

  • 创建 api/memory.py
  • 修改 api/chat.py 传入用户 ID
  • 测试 API

步骤 5: 前端集成(0.5 天)

  • 创建偏好设置组件
  • 集成到设置页面
  • 测试完整流程

九、相关文档